Java 8 開始引入 Completablefuture,基本上就是針對 Future 進行加強,我們可以在非同步任務完成或發生異常時,自動調用 Callback 方法,為何我稱為 Callback 呢 ? 這不得不說...非常的 Functional,接下來讓我們看看原因為何。
來一個簡單的玩意
public class Example01 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("future1 -> " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future1 is done";
});
future1.thenAccept(message -> {
System.out.println("thenAccept1 -> " + message);
});
future1.thenAccept(message -> {
System.out.println("thenAccept2 -> " + message);
});
String message = future1.get();
System.out.println(message);
}
}
在這個範例,我們建立了一個 future1 物件,裡面我們等待了 2 秒,並且丟出相關字串,結果如下
future1 -> ForkJoinPool.commonPool-worker-1
thenAccept1 -> future1 is done
thenAccept2 -> future1 is done
future1 is done
接下來讓我們看看細節, thenAccept 是一個同步方法並且會回傳 future1 的結果,也就是 future1 is done,而因為我們針對 future1 建立了兩個 thenAccept 方法,因此我們將會得到上面的答案。
我們知道為何會這樣列印出字串,那如果有一天我們的 thenAccept 需要使用非同步呢? 那麼也很簡單,我們可以改成 thenAcceptAsync 就變成非同步了,讓我們看看結果會變成如何。
future1 -> ForkJoinPool.commonPool-worker-1
future1 is done
thenAccept2 -> future1 is done
thenAccept1 -> future1 is done
可以看到我們並沒有按照程式順序執行,也就代表我們非同步成功了,當然 future1 能接的也不只 thenAccept,Completablefuter 提供如 thenApplyAsync、thenRunAsync 等等方式,讓我們可以因應不同情境,建立更好的鏈結。
Oops! 發生錯誤
public class Example03 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
System.out.println("future1 -> " + Thread.currentThread().getName());
throw new RuntimeException("future1 is failure");
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future1 is done";
});
CompletableFuture<String> future2 = future1.exceptionally(e -> {
System.out.println("exceptionally -> " + e.getMessage());
return "future1 occur exception";
});
System.out.println(future2.get());
}
}
我們也知道,寫程式肯定要包含例外處理,在 Completablefuture 中也提供了一系列的錯誤處理方式,在這邊我們使用最簡單的 exceptionally 來實作,結果如下
future1 -> ForkJoinPool.commonPool-worker-1
exceptionally -> java.lang.RuntimeException: future1 is failure
future1 occur exception
程式碼跟剛剛大同小異,但我們丟出了 RuntimeException 來讓我們運行失敗,來讓我們達成效果。而大家還記得我上述有說鏈結兩字,所以我們可以看到 future1.exceptionally 會丟出一個 CompletableFuture 物件,我們稱之為 future2,其錯誤處理很簡單的印出字串以及回傳處理結果,這是一個簡潔的使用方式。
誰先來的 Either
public class Example04 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Random rand = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
System.out.println("future1 -> " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future1 is done";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
System.out.println("future2 -> " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future2 is done";
});
CompletableFuture<String> future3 = future1.applyToEitherAsync(future2, message -> {
return "future3 -> " + message;
});
System.out.println(future3.get());
}
}
Either 基本上表示,在任一 CompletionStage 完成,就會往下執行 action,因此我們看看該範例,兩個 Stage Thread Sleep 時間不固定,誰先回來就會顯是誰的回傳值,假設我們跑了兩次,結果可能如下
T0:
future1 -> ForkJoinPool.commonPool-worker-1
future3 -> future1 is done
T1:
future2 -> ForkJoinPool.commonPool-worker-2
future3 -> future2 is done
全部一起來吧!
public class Example05 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Random rand = new Random();
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
System.out.println("future1 -> " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future1 is done";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000 + rand.nextInt(1000));
System.out.println("future2 -> " + Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
return "future2 is done";
});
CompletableFuture<Void> future3 = CompletableFuture.allOf(future1, future2);
CompletableFuture<Object> future4 = CompletableFuture.anyOf(future1, future2);
System.out.println(future3.get());
System.out.println(future4.get());
}
}
有時候我們可能不會一步一步執行,我們可能想要一次執行全部並且等待,這時候就得使用到 allOf 了,若使用 allOf 將會等待所有 Future 回來才進行後續動作。反之,當我們今天想要任一個回來即可,則是使用 anyOf。
按照上述的說明以及程式碼,我們一樣執行兩次看看,結果如下
T0:
future2 -> ForkJoinPool.commonPool-worker-2
future1 -> ForkJoinPool.commonPool-worker-1
null
future2 is done
T1:
future1 -> ForkJoinPool.commonPool-worker-1
future2 -> ForkJoinPool.commonPool-worker-2
null
future1 is done
可以看到使用 allOf 確實有等待全部完成,而 anyOf 得到的答案有可能會是不同的,看哪個 Future 先執行完成,則印出該字串。
結論
謝謝大家把文章看完,看完之後可以看到整體 Completablefuture 使用了許多 Function 介面來執行,這種方式體現了 Functional 的強大,我們可以利用這種方式建立更簡單明瞭的請求鏈結。
當然,Completablefuture 的功能還不只如此,還有更多組合應用沒有說明到,可見這個特性之強大。
參考
https://www.liaoxuefeng.com/wiki/1252599548343744/1306581182447650
https://openhome.cc/Gossip/CodeData/JDK8/CompleteableFuture.html